package com.bieber.computer.splitter;

import au.com.bytecode.opencsv.CSVReader;
import com.bieber.common.Constants;
import com.bieber.common.model.ConsumerPerson;
import com.bieber.computer.ComputerConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.ignite.Ignite;

import java.io.*;

/**
 * Created by bieber on 2015/9/17.
 */
public class PersonSplitter extends DataSplitter<ConsumerPerson> {

    private CSVReader reader;
    
    public PersonSplitter(ComputerConfig config, Ignite ignite) {
        super(config, ignite);
        if(StringUtils.isEmpty(config.getPersonCSVFile())){
            throw new IllegalArgumentException("please configure personCSVFile property");
        }
        try {
            reader = new CSVReader(new InputStreamReader(new FileInputStream(new File(config.getPersonCSVFile()))));
        } catch (FileNotFoundException e) {
            throw new IllegalArgumentException("person csv file "+config.getOrderCSVFile()+" not exist",e);
        }
    }

    //csv file format id,name
    @Override
    protected void split() {
        initCache(Constants.PERSON_CACHE, ConsumerPerson.class);
        while(true){
            try {
                String[] data = reader.readNext();
                if(data==null){
                    isFinished=true;
                    break;
                }
                if(data.length!=2){
                    throw new IllegalArgumentException("person csv file column length must be 2");
                }
                ConsumerPerson person = new ConsumerPerson();
                person.setId(Integer.parseInt(data[0]));
                person.setName(data[1]);
                dataCache.putIfAbsent(person.getId(),person);
            } catch (IOException e) {
                throw new IllegalStateException("failed to read csv file ",e);
            }
        }
    }


}
